Kubernetes 源码笔记(kube-scheduler)

kube-scheduler 运行在 Kubernetes 的管理节点(Master 节点)上,负责完成从 Pod 到 Node 的调度过程。Scheduler 会跟踪集群中所有 Node 的资源利用情况,并采取合适的调度策略,确保调度的均衡性,避免集群中的某些节点过载。

一言以蔽之,kube-scheduler 用来为 Pod 找到一个合适的 Node。

基本原理

kube-scheduler 会对 pod, node 进行 Watch,当 kube-scheduler 监测到未被调度的 pod(spec.nodeName 为空),它会取出这个 pod,然后根据内部设定的调度算法选择合适的 node,通过 api-server 写回到 etcd,这时该 pod 就绑定到了该 node 上,之后 kubelet 会读取到这一信息,在相应的 node 上运行 pod。

基本流程

  • 客户端通过 api-server 创建 pod,相关数据存储到 etcd
  • kube-scheduler 通过 NodeLister 获取所有节点信息
  • 将 scheduled pods 和 assume pods 合并到 pods,作为所有已调度 Pod 信息
  • 从 pods 中整理出 node-pods 的对应关系表 nodeNameToInfo
  • 过滤掉不合适的节点(Predicates 预选)
  • 给剩下的节点依次打分(Priorities 优选)
  • 若分数相同,在节点中随机选择一个节点,否则选择分数最高的节点调用 api 进行 pod 和 node 的绑定。结果存储到 etcd 里

调度策略和算法

k8s 里的调度策略和算法包括预选(predicates),优选(priorities)两个步骤。通俗来说其实就是过滤和评分。

借助下图可以方便理解(来自 DockOne微信分享(一四九):Kubernetes调度详解)

kube_scheduler_algo

Predicates 预选

根据配置的 Predicates Policies(默认为 DefaultProvider 中定义的 default predicates policies 集合)来过滤掉不满足 Policies 的 Nodes,避免资源冲突,节点超载。

典型的 Predicates 算法有:

算法 功能
GeneralPredicates 包含一些基本的筛选规则,主要考虑资源问题,比如 CPU,内存是否足够,端口是否冲突,selector 是否匹配
NoDiskConflict Pod 所需的卷是否与节点已存在的卷冲突,比如如果节点已经挂载了某个卷,其他同样使用这个卷的 Pod 不能再调度到这个主机。
NoVolumeZoneCOnflict 但集群跨可用区部署时,检查 node 所在的 zone 是否满足 Pod 对硬盘的要求
MaxEBSVolumeCount 部署在 AWS 时,检查 node 是否挂载了太多 EBS 卷
MaxGCEPDVolumeCount 部署在 GCE 时,检查 node 是否挂载了太多 PD 卷
PodToleratesNodeTaints 检查 Pod 是否能够容忍 node 上所有的 taints
CheckNodeMemoryPressure 当 Pod QoS 为 besteffort 时,检查 node 剩余内存量,排除内存压力过大的 node
MatchInterPodAffinity 检查 node 是否满足 pod 的亲和性、反亲和性需求
HostName 节点需满足 PodSpec 的 NodeName 字段指定的主机名
CheckNodeDiskPressure 判断节点是否已经处于磁盘压力状态

predicates 相关的算法在 pkg/scheduler/algorithm/predicates/predicates.go 中。

Priorities 优选

根据配置的 Priorities Policies(默认为 DefaultProvider 中定义的 default priorities policies 集合)给预选的 Nodes 打分排名,得分最高的 Node 为最合适的 Node,该 Pod 会绑定到这个 Node。如果得分有并列的情况,则从中选择一个 Node。

典型的 Priority 算法有:

算法 功能
LeastRequestedPriority 按 node 计算资源(CPU/MEM)剩余量排序,挑选最空闲的 node
BalancedResourceAllocation 补充 LeastRequestedPriority,在 CPU 和 MEM 的剩余量中取平衡
SelectorSpreadPriority 同一个 Service/RC 下的 Pod 应该尽可能地分散在集群里。Node 上运行的同个 Service/RC 下的 Pod 数目越少,分数越高
NodeAffinityPriority 按 soft(preferred) NodeAffinity 规则匹配情况排序,规则命中越高,分数越高
TaintTolerationPriority 按 Pod tolerations 与 node taints 的匹配情况排序,越多 taints 不匹配,分数越低
InterPodAffinityPriority 按 soft(preferred) Pod Affinity/Anti-Affinity 规则匹配情况排序,规则命中越多,分数越高/低

priorities 相关的算法在 pkg/scheduler/algorithm/priorities/ 目录下。

最终主机的得分由以下公式计算得到:

finalScoreNode = (weight1 * priorityFunc1) + (weight2 * priorityFunc2) + … + (weightn * priorityFuncn)

代码解析

分析的代码是基于 v1.12.2-beta.0 版本的。之前很长一段时间,Scheduler 的源码在 plugin 目录下,不过 v1.12.2-beta.0 版本的入口程序在 cmd/kube-scheduler/scheduler.go 里。同 Kubernetes 的其他组件类似,先通过 command := app.NewSchedulerCommand() 获取 cobra 的 command 对象然后执行,实际的运行过程在 cmd/kube-scheduler/app/server.go 中。

总体逻辑

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
algorithmprovider.ApplyFeatureGates()

// Configz registration ...

schedulerConfig, err := NewSchedulerConfig(c)
if err != nil {
return err
}
// Create the scheduler ...

// Prepare the event broadcaster ...

// Start up the healthz server ...

// Start all informers
go c.PodInformer.Informer().Run(stopCh)
c.InformerFactory.Start(stopCh)

// Wait for all caches to sync before scheduling
c.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)

run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()

// If leader election is enabled, run via LeaderElector until done and exit
// ...

run(ctx)
return fmt.Errorf("finished without leader elect")
}

Run 方法会根据传入的上下文参数初始化一个 schedulerConfig 对象,根据配置创建 Scheduler 对象,启动所有的 informer,最后运行 Scheduler 的核心逻辑 run 方法,这是一个死循环,直到从通道接收到退出的消息才会退出。它会一直调用 pkg/scheduler/scheduler.go 中 Scheduler 的 Run 方法。

1
2
3
4
5
6
7
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}

go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

其中 scheduleOne 方法为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}

glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

// Synchronously attempt to find a fit for the pod.
start := time.Now()
suggestedHost, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
assumedPod := pod.DeepCopy()

allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
return
}

err = sched.assume(assumedPod, suggestedHost)
if err != nil {
return
}
go func() {
if !allBound {
err = sched.bindVolumes(assumedPod)
if err != nil {
return
}
}

err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
glog.Errorf("Internal error binding pod: (%v)", err)
}
}()
}

scheduleOneRun 方法调用,每次调度一个 pod。首先调用 NextPod,从未调度的队列中取出一个应该被调度的 Pod。接着进行节点的选择。

选择节点

其中 suggestedHost, err := sched.schedule(pod) 这一行调用了实现的 scheduling 算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: err.Error(),
})
return "", err
}
return host, err
}

schedule 可以用来返回一个最合适的 node,在 scheduleOne 中我们可以看到,接下来调用 bind 进行 pod 与 node 的绑定就行了。其中 sched.config.Algorithm.Schedule 会调用调度的真正算法。以上就是 kube-scheduler 的基本逻辑,接下来我们深入到其中需要注意的细节。

细节

获取配置信息

在上述的 Run 方法中,进入 Scheduler 的核心逻辑前的初始化很重要,因为需要初始化 Node,Pod 等的 Informer 方法,确定 Predicate 阶段和 Priority 阶段所需的调度算法,根据接口进行相应的初始化。某种意义上这也是一个依赖注入的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
}

configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: s.ComponentConfig.SchedulerName,
Client: s.Client,
NodeInformer: s.InformerFactory.Core().V1().Nodes(),
PodInformer: s.PodInformer,
PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(),
PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(),
ServiceInformer: s.InformerFactory.Core().V1().Services(),
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: s.ComponentConfig.DisablePreemption,
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds,
})

source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
switch {
case source.Provider != nil:
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
// Use policy config file define policy ...
case source.Policy.ConfigMap != nil:
// Use ConfigMap define policy ...
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
config.Recorder = s.Recorder

config.DisablePreemption = s.ComponentConfig.DisablePreemption
return config, nil
}

NewSchedulerConfig 方法中,有两种方式来创建 scheduler.Config, 这是由配置决定的。用户可以编写 policy 文件,决定调度器可以使用哪些 predicates 和 priorities 算法。这些算法在 pkg/scheduler/algorithm 中定义;也可以根据 algorithm provider 决定,最终都是为了获取 predicates 和 priorities 的方法的集合。以默认的 algorithm provider 为例,CreateFromProvider 接口在 pkg/scheduler/scheduler.go 中定义,实现是在 pkg/scheduler/factory/factory.go 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}

return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}

// ...

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}

predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}

priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}

priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}

predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}

if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache()
glog.Info("Created equivalence class cache")
}

algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)

podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo,
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: func() *v1.Pod {
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
}, nil
}

CreateFromKeys 会根据定义的 predicate 和 priority 的 key 生成一个 scheduler。主要的调度算法都在 pkg/scheduler/core/generic_scheduler.go 中定义。

调度逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond)

if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return "", err
}

nodes, err := nodeLister.List()
if err != nil {
return "", err
}
if len(nodes) == 0 {
return "", ErrNoNodesAvailable
}

// Used for all fit and priority funcs.
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err
}

trace.Step("Computing predicates")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
return "", err
}

if len(filteredNodes) == 0 {
return "", &FitError{
Pod: pod,
NumAllNodes: len(nodes),
FailedPredicates: failedPredicateMap,
}
}
metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

trace.Step("Prioritizing")
startPriorityEvalTime := time.Now()
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
return filteredNodes[0].Name, nil
}

metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return "", err
}
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))

trace.Step("Selecting host")
return g.selectHost(priorityList)
}

如前所述,调度分为几个关键的步骤,首先从 cache 中获取可调度的 nodes,接着预选,筛除不合适的 node,然后优选打分,选出最合适的 node,如果选出了多个 node,则使用 round-robin 算法选出一个 node 作为最终的结果。

Predicate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}

if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes)
numNodesToFind := g.numFeasibleNodesToFind(allNodes)

filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
var (
predicateResultLock sync.Mutex
filteredLen int32
equivClass *equivalence.Class
)

ctx, cancel := context.WithCancel(context.Background())

meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

if g.equivalenceCache != nil {
equivClass = equivalence.NewClass(pod)
}

checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil {
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
}
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.cache,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
return
}
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}

workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
}
}

if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
if err != nil {
if extender.IsIgnorable() {
glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}

for failedNodeName, failedMsg := range failedMap {
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
return filtered, failedPredicateMap, nil
}

Predicate 是预选的过程。其中 checkNode 方法会调用 podFitsOnNode,应用所有配置的预选 Policy 对 Node 进行检查。接着 workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode) 以16个为一批,根据 node 的数量并发检查 node。其中 Extender 是调度算法的一种扩展,也属于自定义调度器的一种方式,如果配置了 Extender,则执行 ExtenderFilter 再次筛选。

Priority
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
func PrioritizeNodes(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
meta interface{},
priorityConfigs []algorithm.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
if len(priorityConfigs) == 0 && len(extenders) == 0 {
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
result = append(result, hostPriority)
}
return result, nil
}

var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)
appendError := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}

results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

for i, priorityConfig := range priorityConfigs {
if priorityConfig.Function != nil {
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
defer wg.Done()
var err error
results[index], err = config.Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}(i, priorityConfig)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
}

processNode := func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
var err error
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
}
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Host = nodes[index].Name
}
}
}
workqueue.Parallelize(16, len(nodes), processNode)
for i, priorityConfig := range priorityConfigs {
if priorityConfig.Reduce == nil {
continue
}
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
defer wg.Done()
if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err)
}
if glog.V(10) {
for _, hostPriority := range results[index] {
glog.Infof("%v -> %v: %v, Score: (%d)", pod.Name, hostPriority.Host, config.Name, hostPriority.Score)
}
}
}(i, priorityConfig)
}
wg.Wait()
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}

result := make(schedulerapi.HostPriorityList, 0, len(nodes))

for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}

if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for _, extender := range extenders {
if !extender.IsInterested(pod) {
continue
}
wg.Add(1)
go func(ext algorithm.SchedulerExtender) {
defer wg.Done()
prioritizedList, weight, err := ext.Prioritize(pod, nodes)
if err != nil {
return
}
mu.Lock()
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
combinedScores[host] += score * weight
}
mu.Unlock()
}(extender)
}
wg.Wait()
for i := range result {
result[i].Score += combinedScores[result[i].Host]
}
}

if glog.V(10) {
for i := range result {
glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
}
}
return result, nil
}

Priority 是优选的过程,processNode 用于对 node 遍历所有的 priorities policy,获取该 node 对于所有 policy 的分数。同 predicate 类似,以16个 goroutine 为一组,根据 nodes 数量,并发执行这些算法,最后对得分进行加权得到最终的分数。

选择节点
1
2
3
4
5
6
7
8
9
10
11
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
if len(priorityList) == 0 {
return "", fmt.Errorf("empty priorityList")
}

maxScores := findMaxScores(priorityList)
ix := int(g.lastNodeIndex % uint64(len(maxScores)))
g.lastNodeIndex++

return priorityList[maxScores[ix]].Host, nil
}

经过 Predicates 预选阶段和 Priorities 优选阶段后,我们需要选择一个最终的节点,首先根据分数进行排序,如果分数最高的节点有多个,则根据最高分数的个数进行 round-robin 选择。findMaxScores 用来构造按照分数进行排列的优先列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func findMaxScores(priorityList schedulerapi.HostPriorityList) []int {
maxScoreIndexes := make([]int, 0, len(priorityList)/2)
maxScore := priorityList[0].Score
for i, hp := range priorityList {
if hp.Score > maxScore {
maxScore = hp.Score
maxScoreIndexes = maxScoreIndexes[:0]
maxScoreIndexes = append(maxScoreIndexes, i)
} else if hp.Score == maxScore {
maxScoreIndexes = append(maxScoreIndexes, i)
}
}
return maxScoreIndexes
}

回到 scheduleOne 方法,获取最终的候选节点后,首先进行 Volume 的分配,绑定,最后通过 bind 方法进行最后的 pod 和 node 的绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
bindingStart := time.Now()
err := sched.config.GetBinder(assumed).Bind(b)
if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil {
glog.Errorf("scheduler cache FinishBinding failed: %v", err)
}
if err != nil {
glog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
glog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "BindingRejected",
})
return err
}

metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
return nil
}

sched.config.GetBinder(assumed).Bind(b) 中的 Bind 的实现在 pkg/scheduler/factory/factory.go 中。

1
2
3
4
func (b *binder) Bind(binding *v1.Binding) error {
glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
}

Scheduler 最后会向 apiserver 发送 Binding 对象,如果绑定失败,执行回滚操作。至此,调度过程结束,运行 Pod 的工作将交给绑定的 Node 上的 kubelet。

扩展

自定义调度

如果默认的调度器不满足要求,可以部署自定义的调度器,在部署的时候可以通过 podSpec.schedulerName 来选择使用哪一个调度器。Kubernetes 的调度器以插件化的形式实现,方便用户对调度定制和二次开发。

定制 Predicates 和 Priority

启动 kube-schduler 的时候可以使用 --policy-config-file--policy-configmap参数指定调度策略。

自定义 Predicates 和 Priority

以 Predicates 为例,pkg/scheduler/types.go 中定义了 Predicate 应该实现的接口:type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)

要实现自定义的 Predicates 的话,可以在 pkg/scheduler/algorithm/predicates/predicates.go 中实现自己的算法。然后在 pkg/scheduler/algorithm/algorithmprovider/defaults/defauts.go 中的 defaultPredicates 进行注册,通过 --policy-config-file--policy-configmap 写入该方法名即可。

编写自己的调度器组件

从代码中可以了解到,只要命名空间不发生冲突,Kubernetes 集群中允许同时运行多个 Scheduler,可以参考文档:Configure Multiple Schedulers。在下面的给出的 KubeCon 链接中,有几个有意思的 Scheduler 也可以参考一下。

References